/**
*@author tonychao 
*功能 接管用户提交任务， 然后使用Spark-submit进行处理
*/
package shellInterface;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.xml.parsers.DocumentBuilder;

import org.json.JSONException;

import com.mysql.cj.util.StringUtils;

import FileIO.OutPutFile;
import fetcher.SparkConfiguration;
import graybox.GrayBoxConf;
import huristic.HuristicConf;
import mysql.DB;
import mysql.DataFile;
import others.Global;
import others.Global.SparkTuneOption;

public class UserSubmitInterface_test {
	//因为调优假设只有一个任务在同时运行，所以，就使用了如此僵硬的静态函数写法，其实应该写成单例模式
	static int inputFileSize;
	static Global.SparkMode sparkMode;
	static String application_path=null,className=null;
	static ArrayList<String> rawSparkOptions= new ArrayList<String>();//Spark 参数 conf 形式
	static ArrayList<String> appArguments = new ArrayList<String>(); // jar 包参数，编程时指定

	public static void main(String[] args) throws Exception {
		
		//从配置文件获取参数设置
		try {
			@SuppressWarnings("unused")
			Global aGlobalStatics=new Global();
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		UIOutPut("当前的操作系统是"+OSinfo.getFullOSName()+"\t版本号:"+System.getProperty("os.version")+"\t架构:"+System.getProperty("os.arch"));
		
		//解析命令，参数直接存到静态变量里面了
        try {
			parseArgs(args);
		} catch (IOException | InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
        
		//计算应用的MD5
		String programMD5 = null;
		try {
			programMD5 = getMd5ByPath(application_path);
			UIOutPut("获得应用MD5：\t"+programMD5);
		} catch (NoSuchAlgorithmException e) {
			// TODO Auto-generated catch block
			UIErr("无法计算MD5！");
			e.printStackTrace();
			System.exit(1);
		}
        
		//关于是否使用数据库的逻辑
		if (Global.USE_DB_Flag){
	        //尝试链接数据库
			boolean DBConnected=true;
			DB db= new DB();
			try {
				db.initConnection();
			} catch (ClassNotFoundException e1) {
				// TODO Auto-generated catch block
				DBConnected=false;
				e1.printStackTrace();
			} catch (SQLException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
				DBConnected=false;
			}
			//如果数据库没有成功连接，则仅使用启发式优化
			if (!DBConnected){
				UIOutPut("未能成功连接数据库:\t"+Global.CURRENT_DB +"\t with user name:"+Global.USER_NAME+"\t and password:"+ Global.PASS_WORD);
				UIOutPut("仅使用应用性能模型进行启发式优化");
				String ret=callSparkSubmit(programMD5,className,rawSparkOptions,application_path,appArguments,db,SparkTuneOption.HURISTIC_ONLY);//direct:仅仅使用启发式优化参数
			}
			// 如果是新的应用则注册之
			else if (!db.isRegisted(programMD5,className)){
				UIOutPut("此应用未注册");
				UIOutPut("仅使用应用性能模型进行启发式优化");
				db.registe(programMD5,className);
				String ret=callSparkSubmit(programMD5,className,rawSparkOptions,application_path,appArguments,db,SparkTuneOption.HURISTIC_ONLY);
				String app_id= parseApplicationIDfromReturnData_V2_1(ret);
				if(!app_id.equals("-1")) db.addRunRecord(programMD5,className,app_id,Global.WAIT_TIME_IN_MS);
			}
			//提交的任务已注册但是未建立模型，也没有办法优化
			else if(!db.hasModel(programMD5,className)){
				UIOutPut("此应用已注册，但没有模型");
				UIOutPut("仅使用应用性能模型进行启发式优化");
				String ret=callSparkSubmit(programMD5, className,rawSparkOptions,application_path,appArguments,db,SparkTuneOption.HURISTIC_ONLY);;
				String app_id= parseApplicationIDfromReturnData_V2_1(ret);
				if(!app_id.equals("-1"))  db.addRunRecord(programMD5,className,app_id,Global.WAIT_TIME_IN_MS);
			}
			//提交的任务已经建立模型 则启动优化
			else{		
				UIOutPut("此应用已注册，且有模型。可以进行优化"); 
				UIOutPut("使用应用性能模型启发式优化+历史任务建模参数空间搜索进行优化");
				String ret=callSparkSubmit(programMD5, className,rawSparkOptions,application_path,appArguments,db,SparkTuneOption.HURISTIC_AND_GRAYBOX);//graybox:使用启发式优化+黑盒优化技术
				String app_id= parseApplicationIDfromReturnData_V2_1(ret);
				if(!app_id.equals("-1"))  db.addRunRecord(programMD5,className,app_id,Global.WAIT_TIME_IN_MS);
			}
			db.close();
		}
		else{//如果不使用数据库，则使用本地文件进行存储
			SparkTuneOption option=null;
			UIOutPut("使用本地文件代替数据库进行存储");
			DataFile dataFile= new DataFile(programMD5, className);
			if (!dataFile.isRegisted(programMD5, className)){
				String ret=dataFile.registe(programMD5, className);
				UIOutPut("创建新的数据转储文件"+ret);
				option=SparkTuneOption.HURISTIC_ONLY;
			}else {
				UIOutPut("数据转储文件已存在");
				if (!dataFile.hasModel(programMD5, className)){
					UIOutPut("尚未建立模型，仅仅进行启发式优化");
					option=SparkTuneOption.HURISTIC_ONLY;
				}
				else{
					UIOutPut("使用模型进行优化");
					option=SparkTuneOption.HURISTIC_AND_GRAYBOX;
				}
				
			}
			//double accRatio=db.getaccRatio(programMD5,className);
			//UIOutPut("*********预计减少运行时间:"+accRatio+"***********" );
			String ret=callSparkSubmit(programMD5, className,rawSparkOptions,application_path,appArguments,null,option); //graybox:使用灰盒调优模型
			String app_id= parseApplicationIDfromReturnData_V2_1(ret);
			//app_id="application_1554800229594_0007"; //测试使用
			if(!app_id.equals("-1")) dataFile.addRunRecord(programMD5,className,app_id,Global.WAIT_TIME_IN_MS,inputFileSize);
		}
        System.exit(0);
	}
	
	/**
	 * file_size_[input_file_size_IN_MB] spark-submit  [options] <app jar | python file> [app arguments]
	*用于解析用户的命令
	*/
	private static void parseArgs(String[] args) throws IOException, InterruptedException, SQLException, JSONException, ParseException {
		inputFileSize=-1;
		boolean app_flag=false;
		String inputDataParh="";

		ArrayList<String> argList= new ArrayList<String>(Arrays.asList(args));
		//for (String term : argList) System.out.println(term);
		
		for (String arg : argList){
			if (arg.equals("spark-submit")) continue;
			else if (arg.matches("^file_size_.+")){
				arg=arg.substring(10, arg.length());
				inputFileSize=Integer.parseInt(arg);
				
			}
			//因为分割字符串的坑爹方法，所以要过滤 backslash
			else if (arg.equals("\\")) continue;
			//如果是提交任务
			else if (arg.matches(".*\\.jar")||arg.matches(".*\\.py")){
				app_flag=true;
				application_path=arg;
			}
			//任务已提交，说明剩下的是app 的自己的参数，不能够修改，原样输出
			else if(app_flag){
				appArguments.add(arg);
				if(inputDataParh.equals("")&&arg.matches("^hdfs://.+")){
					inputDataParh=arg;
					//inputFileSize=getFileSizeByHDFS(inputDataParh);
				};
			}
			//任务未提交 说明是spark的配置参数
			else{
				rawSparkOptions.add(arg);
			}
		}
		
		//解析 spark 部署模式 和 claseName
		ArrayList<String> userDefinedParameters=(ArrayList<String>) rawSparkOptions.clone();
		for (int i=0;i<userDefinedParameters.size();i++){
			String term= userDefinedParameters.get(i);
			if (term.equals("--conf")){	//配置
				userDefinedParameters.remove(i);
				i--;
			}
			else if (term.equals("--class")) { //对于一个jar包打包的Spark应用而言，整体由 jar包+class 决定了实际执行的程序
				className=userDefinedParameters.get(i+1); //如果是spark class 类
				userDefinedParameters.remove(i);		//delete --class 以及 class 的内容
				userDefinedParameters.remove(i);
				i--;
			}
			else if (term.equals("--master")){ 			 //模式代码 目前支持  “yarn”
				String tmp=userDefinedParameters.get(i+1);
				switch (tmp) {
				case "yarn":
					sparkMode=Global.SparkMode.SPARK_ON_YARN;
					break;
				default:
					sparkMode=Global.SparkMode.SPARK_STAND_ALONE;
					break;
				}
				i++;
			}
		}
		//System.out.println(optList);
		//System.out.println(rawOptions);
		
		
		//这说明没有找到提交的任务文件，执行失败，弹出错误信息
		if (!app_flag){
			UIErr("命令不符合规范！");
			UIErr("用法："+" file_size_[input_file_size_IN_MB] spark-submit [options] <app jar | python file> [app arguments]");
			System.exit(1);
		}
	}

	
	/**
	 * 从返回字符串中获得app-id linux 
	 * @return String app-id
	 */
	public static String parseApplicationIDfromReturnData_V2_1(String ret) {
		//StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20170729084629-0000
		String mesosMode="Framework registered with ";
		String standAloneMode="app ID ";
		//String yarnMode="Submitting application "; //Submitting application application_1532181494552_0007 to ResourceManager
		String yarnMode="(application_\\d+_\\d+)";
		if (ret==null) return "-1";
		int index=ret.indexOf(standAloneMode);
		if(index !=-1){
			index+=standAloneMode.length();
			ret=ret.substring(index);
			String[] list= ret.split("[ \t\n]+");
			ret=list[0];
			if (ret.equals("")) ret=list[1];
			return ret;
		}
		
		index=ret.indexOf(mesosMode);
		if (index !=-1){
			index=ret.indexOf(mesosMode)+mesosMode.length();
			ret=ret.substring(index);
			String[] list= ret.split("[ \t\n]+");
			ret=list[0];
			if (ret.equals("")) ret=list[1];
			return ret;
		}
		
		Pattern pyarn=Pattern.compile(yarnMode);
		Matcher m = pyarn.matcher(ret);
		
		if (m.find()){
			return m.group(0);
		}
		UIErr("没找到appid");
		return null;

	}

	
	public static int getFileSizeByHDFS(String path) throws IOException{
		Process process=Runtime.getRuntime().exec( "hadoop fs -du -h "+path, null, null);
		InputStream inStream= process.getInputStream();
		Scanner s = new Scanner(inStream);
		Double t=(double) -1;
		while (s.hasNextLine()) {
			String line = s.nextLine();
			String[] tmp=line.split("\\s+");
			t= Double.valueOf(tmp[0]);
			switch(tmp[1]){
			case "M":
			case "m":
				break;
			case "G":
			case "g":
				t=t*1024;
				break;
			case "K":
			case "k":
				t=t/1024;
				break;
			}
			break;

		}
		s.close();
		process.destroy();
		
		return t.intValue();
	}
	
	/**
	 * @param app_md5， className
	 * @param term
	 * @param appArguments
	 * @param string
	 * spark-tune-submit [options] <app jar | python file> [app arguments]
	 * @throws Exception 
	 */
	private static String callSparkSubmit(String app_md5,String className,ArrayList<String> sparkParameters, String programPath, ArrayList<String> appArguments,
			DB db, SparkTuneOption tuneModeOp) throws Exception {
		
		
		if (Global.DEFAULT_OPTION==SparkTuneOption.NOT_SET){ 
			//使用自动选择的调优模式
		}
		else {
			tuneModeOp=Global.DEFAULT_OPTION;
			UIOutPut("用户自己制定的调优方式:");
		}
		
		
		String command= "";
		boolean ifTune=false;
		
		
		if(tuneModeOp==SparkTuneOption.HYBIRD){
			UIOutPut("选择HYBIRD模式进行优化,阈值："+String.valueOf(Global.RETRAIN_COUNT)+"\t");
			UIOutPut("p："+String.valueOf(Global.HYBIRD_p)+"\t");
			UIOutPut("p："+String.valueOf(Global.HYBIRD_K)+"\t");
			int count=new DataFile(app_md5, className).get_succeed_count(app_md5,className);
			if (!(count>=Global.RETRAIN_COUNT)||!(new DataFile(app_md5, className).hasStatisticFileandHeuristicFile())){//数据不够，启发式
				tuneModeOp=SparkTuneOption.HURISTIC_ONLY;
				Global.DEFAULT_OPTION=SparkTuneOption.HURISTIC_ONLY;
				UIOutPut("HYBIRD模式训练数据不够，HURISTIC_ONLY");
			}
			else{//根据条件判别
				//实际逻辑很复杂，因为hybird 方法需要先进行RRS 搜索，然后才能判断，所以处理代码延后到
				//tuneModeOp=figureOutTheSolutionOfHYBIRD();
			}
		}
		
		//逻辑短路,默认交互式命令行应该由用户选择进行调优或者直接执行，但是两者不都为false时，则直接短路
		if (Global.ALWAYS_NOT_TUNE){
			ifTune=false;
			UIOutPut("调优选项短路： ALWAYS_NOT_TUNE 总是不调优");
		}
		else if(Global.ALWAYS_TUNE){
			ifTune=true;
			UIOutPut("调优选项短路：ALWAYS_TUNE 总是调优");
				if(tuneModeOp==SparkTuneOption.GRAYBOX_ONLY&& !(new DataFile(app_md5, className).hasModel(app_md5, className))){//gray box 方法必须要由足够的模型才能使用
					ifTune=false;
					UIOutPut("调优选项短路：GRAYBOX_ONLY,没有模型，按原来参数执行");
				}
				if(tuneModeOp==SparkTuneOption.HURISTIC_AND_GRAYBOX&& !(new DataFile(app_md5, className).hasModel(app_md5, className))){//gray box 方法必须要由足够的模型才能使用
					ifTune=false;
					UIOutPut("调优选项短路：HURISTIC_AND_GRAYBOX,没有模型，按原来参数执行");
				}
				

		}

		else {		//用户自己选
		Scanner s = new Scanner(System.in);
		UIOutPut("是否启用优化？请输入Y/N");
		outerLable:
			while(s.hasNext()){
				switch (s.next()) {
				case "Y":
				case "y":
				case "Yes":
				case "YES":
				case "yes":
					ifTune=true;
					break outerLable;
				case "N":
				case "n":
				case "No":
				case "NO":
				case "no":
					ifTune=false;
					break outerLable;
				default:
					UIOutPut("请输入Y/N");
					break;
				}
			}
		}

		if(ifTune){
			UIOutPut("启动优化模式");
			//使用优化参数
			sparkParameters=getTunedParameters(app_md5,className,tuneModeOp,db);
			if (className!=null) {
				sparkParameters.add(0, "--class "+className+" ");
			}
			if (sparkMode==Global.SparkMode.SPARK_ON_YARN){
				sparkParameters.add(1, "--master yarn ");
			}
		}
		else {
			UIOutPut("不启动优化模式");
			//啥都不干 sparkParameters
			sparkParameters=sparkParameters;
		}
		
	

		//重新拼装命令	
		for (String str :sparkParameters){
			command=command+str+" ";
		}
		//将命令拼好，然后提交给shell 执行
		command=command+programPath+" ";
		for (String str:appArguments){
			command=command+str+" ";
		}
		
		ArrayList<String> commandArray =new ArrayList<String>();
		commandArray.add("spark-submit "+command);
		
		//同步进程块2 ：此时已经完成命令字符串的拼装
		{	
			OutPutFile f= new OutPutFile("./spark_tuned_command.json");
			f.addLine(commandArray.get(0));
			f.close();
			UIOutPut("SYNCHRONOUS_CONTROL_OUT_PUT2");
			//System.exit(0);
			Thread.sleep(300);
		}
		
		ArrayList<String> retStr=LinuxShell.runInSequence(commandArray);
		if (retStr==null) {
			UIErr("没有拿到应用返回字符串");
			return null;
		}
		return retStr.get(0);
	}


	/**
	 *从调优模块哪里拿到优化之后的数据
	 * @param options
	 * 用户输入的参数实际上并没有得到应用
	 * @return
	 * @throws Exception 
	 */
	private static ArrayList<String> getTunedParameters(String app_md5,String className,SparkTuneOption tuneModeOp,DB db) throws Exception {
		
		//启发式部分目前包括 
		//spark.executor.cores
		//spark.executor.memory
		//spark.task.cpus
		//spark.default.parallelism?
		//spark.driver.cores
		//spark.driver.memory
		//spark.default.parallelism
		ArrayList<String> huristicConfList= new HuristicConf(inputFileSize,sparkMode).toConfList(); //启发式优化的参数

		//灰盒模型暂时包括 --使用灰盒模型对应用进行建模--分区
		//shuffle read/shuffle write
		//网络带宽
		//是否要解压缩-计算平衡
		//梯度下降方法暂时包括 --使用梯度下降法进行训练和搜索部分分组的参数
		
		GrayBoxConf grayboxConf=null;
		SparkConfiguration configuration=new SparkConfiguration();

		
		switch (tuneModeOp){
			case HURISTIC_ONLY:
				//仅仅使用启发式优化,跳过
				
				break;
			case HURISTIC_AND_GRAYBOX:
				ArrayList<String> res=null;
				//使用灰盒优化模型
				if (Global.USE_DB_Flag){
					grayboxConf =new GrayBoxConf(app_md5, className);
					configuration=grayboxConf.searchRRS();
					res=configuration.toConfList(new ArrayList<String>(Arrays.asList("group1"))); //这一函数将group1+group2+group3 的 所有参数按照Linux-shell Spark 命令行的规范输出
				}
				else {//不使用DB，而是使用本地文件进行优化
					grayboxConf =new GrayBoxConf(app_md5, className);
					configuration=grayboxConf.searchRRS();
					res=configuration.toConfList(new ArrayList<String>(Arrays.asList("group1")));
				}
				//灰盒优化的参数加入黑盒优化参数中
				huristicConfList.addAll(res);
				break;
				
			case GRAYBOX_ONLY:
				if (Global.USE_DB_Flag){
					grayboxConf =new GrayBoxConf(app_md5, className);
					configuration=grayboxConf.searchRRS();
					res=configuration.toConfList(new ArrayList<String>(Arrays.asList("group0","group1"))); //这一函数将group1+group2+group3 的 所有参数按照Linux-shell Spark 命令行的规范输出
				}
				else {//不使用DB，而是使用本地文件进行优化
					grayboxConf =new GrayBoxConf(app_md5, className);
					configuration=grayboxConf.searchRRS();
					res=configuration.toConfList(new ArrayList<String>(Arrays.asList("group0","group1")));
				}
				huristicConfList=res;
				break;
			case HYBIRD: //混合模式
				if (Global.USE_DB_Flag){
					UserSubmitInterface_test.UIErr("HYBIRD模式在数据库上的开发工作未完成");
				}
				else {//，而是使用本地文件进行优化
					UserSubmitInterface_test.UIOutPut("HYBIRD模式！，有充分数据！");
					grayboxConf =new GrayBoxConf(app_md5, className);
					configuration=grayboxConf.searchRRS();
					
					DataFile tmpData= new DataFile(app_md5, className);
					double t=configuration.f_x-tmpData.get_E_err()-tmpData.get_huristic_time_in_ms();
					
					double k =Global.HYBIRD_K;
					double sigma= tmpData.get_E_STD();
					
					if(t<=-k*sigma){ //所有参数使用机器学习模型
						UserSubmitInterface_test.UIOutPut("HYBIRD模式：$1使用机器学习参数");
						huristicConfList=configuration.toConfList(new ArrayList<String>(Arrays.asList("group0","group1")));
					}
					else if(t>= -k*sigma && t<= k*sigma){ 
						
						double mix_probablity=(k*sigma+t)/(2*k*sigma);
						UserSubmitInterface_test.UIOutPut("HYBIRD模式：扔硬币+\t mix probablity="+String.valueOf(mix_probablity));
						if (Math.random()<=mix_probablity){ //mix 模式
							UserSubmitInterface_test.UIOutPut("HYBIRD模式：$2mix参数");
								res=configuration.toConfList(new ArrayList<String>(Arrays.asList("group1")));
								huristicConfList.addAll(res);
						}
						else { ///所有参数使用机器学习模型
							UserSubmitInterface_test.UIOutPut("HYBIRD模式：$3使用机器学习参数");
							huristicConfList=configuration.toConfList(new ArrayList<String>(Arrays.asList("group0","group1")));
						}
						

					}
					else{//mix 模式
						res=configuration.toConfList(new ArrayList<String>(Arrays.asList("group1")));
						huristicConfList.addAll(res);
						UserSubmitInterface_test.UIOutPut("HYBIRD模式：$4mix参数");
	
					}

					
				}
				
				
				break;
				
			default:
				UIErr("出现问题");
		}
		
		//将configuration 中使用启发式优化的部分进行赋值，仅仅是为了演示的时候统一返回json文件,因为此时huristic中已经有了字符串格式的参数
		configuration.setHuristicConfs(new HuristicConf(inputFileSize,sparkMode));
		System.out.println(configuration.toJsonString()); //打印启发式优化参数
		
		
		//输出预测的性能提升程度
		double accRatio=0;
		try {
			//if (tuneModeOp.equals(SparkTuneOption.HURISTIC_AND_GRAYBOX)&&Global.USE_DB_Flag==false){ //使用数据文件代替数据库
			if (Global.USE_DB_Flag==false){ //使用数据文件代替数据库
				DataFile dataFile= new DataFile(app_md5, className);
				accRatio = dataFile.getassRatio(app_md5,className);
			}
			else if(tuneModeOp.equals(SparkTuneOption.HURISTIC_AND_GRAYBOX)&&Global.USE_DB_Flag==true){ //使用数据库
				accRatio = db.getaccRatio(app_md5,className);
			}
			else { //当没有历史数据的情况下没有办法进行优化效果预测
				UIOutPut("当没有充分的历史数据，无法预测优化效果");
				accRatio=-1;
			}
			
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//加速效果非常明显，就少写一点
		if (accRatio>0.5){
			accRatio*=0.75;
		}
		
		UIOutPut("*********根据历史数据预计减少运行时间:"+accRatio+"***********" );
		
		//将参数的Json格式写入文件
		OutPutFile f= new OutPutFile("./Sparkconf.json");
		OutPutFile f2=new OutPutFile("./Speedupratio.double");
		f.addLine(configuration.toJsonString());
		f2.addLine(String.valueOf(accRatio));
		f.close();
		f2.close();
		//同步进程块1
		{
			//UIOutPut("SYNCHRONOUS_CONTROL_OUT_PUT1");
			//System.exit(0);
			//Thread.sleep(300);
		}
		
		return huristicConfList;
	}



	/**
	 * Spark可以使用本地文件提交程序
	 * @return File 非空 代表本地磁盘路径下存在此文件，null代表不存在此文件
//         System.err.println("FILE");
//         System.out.println(byteBuffer);
//         for ( int i =0;i<byteBuffer.limit();i++){
//        	 System.out.print(byteBuffer.get(i));
//         }
//         
        // System.out.println("");
         
	 */
	private static File getFileOnLocalFileSystem(String pathStr){
		Path path = FileSystems.getDefault().getPath(pathStr);
		File aFile= new File(path.toString());
		return aFile;
	}


	//向用户输出
	public static void UIOutPut(String line) {
		System.out.println("[调优模块]  "+line);

		System.out.flush();// 必须通过flush将字节流清空
	}
	
	public void UIWarn(String line){
		System.err.println("NOT DEFINED");
		System.err.flush();// 必须通过flush将字节流清空
	}
	
	public static void UIErr(String line){
		System.err.println("[调优模块 错误]  "+line);
		System.err.flush();// 必须通过flush将字节流清空
	}
	

	
	
	/**
	 * 在文件系统下计算MD5值
	 * @param file
	 * @return
	 * @throws FileNotFoundException
	 */
	 public static String getMd5ByFile(File file) throws FileNotFoundException {  

         String value = null;  
         FileInputStream in = new FileInputStream(file);  
     try {  
         MappedByteBuffer byteBuffer = in.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, file.length());  
         MessageDigest md5 = MessageDigest.getInstance("MD5");
         
//         System.err.println("FILE");
//         System.out.println(byteBuffer);
//         for ( int i =0;i<byteBuffer.limit();i++){
//        	 System.out.print(byteBuffer.get(i));
//         }
//         
        // System.out.println("");
         
         md5.update(byteBuffer); 
         BigInteger bi = new BigInteger(1, md5.digest());  
         value = bi.toString(16);  
     } catch (Exception e) {  
         e.printStackTrace();  
     } finally {  
             if(null != in) {  
                 try {  
                 in.close();  
             } catch (IOException e) {  
                 e.printStackTrace();  
             }  
         }  
     }  
     return value;  
     }  
 
	/**
	 * 
	 * @param pathStr
	 * @return
	 * @throws IOException 
	 * @throws NoSuchAlgorithmException 
	 * Null 如果没有根据文件路径找到文件 ，否则返回文件的MD5值
	 */
	 public static String getMd5ByPath(String pathStr) throws IOException, NoSuchAlgorithmException{
		File localFile;
		//如果文件在本地硬盘上，并且能打开
		if ((localFile=getFileOnLocalFileSystem(pathStr))!=null&&(localFile.exists())){
			return getMd5ByFile(localFile);
		}
		//如果文件在HDFS上能够找到，暂时屏蔽
		/*
		else if(HDFSFetcher_test.isFileOnHDFS(pathStr)){
			return HDFSFetcher_test.getHDFSMD5(pathStr);
		}
		*/
		//如果文件在HDFS和本地硬盘上都打不开，说明有问题
		else {
			UIErr("无法找到文件\" "+pathStr+"\"\t\""+pathStr+"\"既不是HDFS上的文件，也不是本地文件");
			return null;
		}
	}
	 
	
	 /**
	  * 
	  * @param str 需要校验的字符串
	  * @return MD5
	  * @throws NoSuchAlgorithmException
	  */
	 public static String getMD5ByString(String str) throws NoSuchAlgorithmException{
		 str="spark-submit file:///home/tonychao/spark-2.0.2-bin-hadoop2.6/README.md";
	    MessageDigest md5 = MessageDigest.getInstance("MD5");  
	    byte[] a= str.getBytes();

	    md5.update(a); 
	    BigInteger bi = new BigInteger(1, md5.digest());  
	    String value = bi.toString(16);
	    return value; 
	}
}
